/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.search;

import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;

import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.tests.util.LuceneTestCase.Nightly;
import org.apache.lucene.tests.util.TimeUnits;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// This test takes approx 30 seconds on a 2012 MacBook Pro running in IntelliJ. There should be a
// bunch of update threads dumped out all waiting on DefaultSolrCoreState.getIndexWriter,
// DistributedUpdateProcessor.versionAdd(DistributedUpdateProcessor.java:1016) and the like in a
// "real" failure. If we have false=fails we should probably bump this timeout. See SOLR-7836
@TimeoutSuite(millis = 7 * TimeUnits.MINUTE)
@Nightly
public class TestReloadDeadlock extends TestRTGBase {
  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  @BeforeClass
  public static void beforeClass() throws Exception {
    initCore("solrconfig-tlog.xml", "schema15.xml");
  }

  public static void ifVerbose(Object... args) {
    if (VERBOSE) {
      // if (!log.isDebugEnabled()) return;
      StringBuilder sb = new StringBuilder("VERBOSE:");
      for (Object o : args) {
        sb.append(' ');
        sb.append(o == null ? "(null)" : o.toString());
      }
      log.info("{}", sb);
    }
  }

  @Test
  public void testReloadDeadlock() throws Exception {
    clearIndex();
    assertU(commit());

    final int commitPercent = 5 + random().nextInt(5);
    final int deleteByQueryPercent = 20 + random().nextInt(20);
    final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(50));
    int nWriteThreads = 5 + random().nextInt(10);

    // query variables
    final AtomicLong reloads =
        new AtomicLong(50); // number of reloads. Increase this number to force failure.

    ifVerbose(
        "commitPercent",
        commitPercent,
        "deleteByQueryPercent",
        deleteByQueryPercent,
        "ndocs",
        ndocs,
        "nWriteThreads",
        nWriteThreads,
        "reloads",
        reloads);

    initModel(ndocs);

    final AtomicBoolean areCommitting = new AtomicBoolean();

    List<Thread> threads = new ArrayList<>();

    final AtomicLong testVersion = new AtomicLong(0);

    for (int i = 0; i < nWriteThreads; i++) {
      Thread thread =
          new Thread("WRITER" + i) {
            Random rand = new Random(random().nextInt());

            @Override
            public void run() {
              try {
                while (reloads.get() > 0) {
                  int oper = rand.nextInt(100);

                  if (oper < commitPercent) {
                    if (areCommitting.compareAndSet(false, true)) {
                      Map<Integer, DocInfo> newCommittedModel;
                      long version;

                      synchronized (TestReloadDeadlock.this) {
                        newCommittedModel = new HashMap<>(model); // take a snapshot
                        version = snapshotCount++;
                      }

                      ifVerbose("hardCommit start");
                      assertU(commit());
                      ifVerbose("hardCommit end");

                      synchronized (TestReloadDeadlock.this) {
                        // install this model snapshot only if it's newer than the current one
                        if (version >= committedModelClock) {
                          ifVerbose("installing new committedModel version=" + committedModelClock);
                          committedModel = newCommittedModel;
                          committedModelClock = version;
                        }
                      }
                      areCommitting.set(false);
                    }
                    continue;
                  }

                  int id;

                  if (rand.nextBoolean()) {
                    id = rand.nextInt(ndocs);
                  } else {
                    id = lastId; // reuse the last ID half of the time to force more race conditions
                  }

                  // set the lastId before we actually change it sometimes to try and
                  // uncover more race conditions between writing and reading
                  boolean before = rand.nextBoolean();
                  if (before) {
                    lastId = id;
                  }

                  DocInfo info = model.get(id);

                  long val = info.val;
                  long nextVal = Math.abs(val) + 1;

                  long version = testVersion.incrementAndGet();

                  // yield after getting the next version to increase the odds of updates happening
                  // out of order
                  if (rand.nextBoolean()) Thread.yield();

                  if (oper < commitPercent + deleteByQueryPercent) {
                    deleteByQuery(id, nextVal, version);
                  } else {
                    addDoc(id, nextVal, version);
                  }

                  if (!before) {
                    lastId = id;
                  }
                }
              } catch (Throwable e) {
                reloads.set(-1L);
                log.error("", e);
                throw new RuntimeException(e);
              }
            }
          };

      threads.add(thread);
    }

    for (Thread thread : threads) {
      thread.start();
    }

    // The reload operation really doesn't need to happen from multiple threads, we just want it
    // firing pretty often.
    while (reloads.get() > 0) {
      Thread.sleep(10 + random().nextInt(250));
      reloads.decrementAndGet();
      h.getCoreContainer().reload("collection1");
    }

    try {
      for (Thread thread : threads) {
        thread.join(10000); // Normally they'll all return immediately (or close to that).
      }
    } catch (InterruptedException ie) {
      fail("Shouldn't have sat around here this long waiting for the threads to join.");
    }
    for (Thread thread : threads) { // Probably a silly test, but what the heck.
      assertFalse(
          "All threads should be dead, but at least thread " + thread.getName() + " is not",
          thread.isAlive());
    }
  }

  private void addDoc(int id, long nextVal, long version) throws Exception {
    ifVerbose("adding id", id, "val=", nextVal, "version", version);

    Long returnedVersion =
        addAndGetVersion(
            sdoc(
                "id",
                Integer.toString(id),
                FIELD,
                Long.toString(nextVal),
                "_version_",
                Long.toString(version)),
            params(DISTRIB_UPDATE_PARAM, FROM_LEADER));
    if (returnedVersion != null) {
      assertEquals(version, returnedVersion.longValue());
    }

    // only update model if the version is newer
    synchronized (model) {
      DocInfo currInfo = model.get(id);
      if (version > currInfo.version) {
        model.put(id, new DocInfo(version, nextVal));
      }
    }

    ifVerbose("adding id", id, "val=", nextVal, "version", version, "DONE");
  }

  private void deleteByQuery(int id, long nextVal, long version) throws Exception {
    ifVerbose("deleteByQuery id", id, "val=", nextVal, "version", version);

    Long returnedVersion =
        deleteByQueryAndGetVersion(
            "id:" + Integer.toString(id),
            params("_version_", Long.toString(-version), DISTRIB_UPDATE_PARAM, FROM_LEADER));

    // TODO: returning versions for these types of updates is redundant
    // but if we do return, they had better be equal
    if (returnedVersion != null) {
      assertEquals(-version, returnedVersion.longValue());
    }

    // only update model if the version is newer
    synchronized (model) {
      DocInfo currInfo = model.get(id);
      if (Math.abs(version) > Math.abs(currInfo.version)) {
        model.put(id, new DocInfo(version, -nextVal));
      }
    }

    ifVerbose("deleteByQuery id", id, "val=", nextVal, "version", version, "DONE");
  }
}
